// Copyright (c) 2022 by Duguang.IO Inc. All Rights Reserved.
// Author: Ethan Liu
// Date: 2022-06-05 09:51:05

package engine

import (
	"bytes"
	"context"
	"errors"
	"io"
	"sync"
	"time"

	logger "jianmu-worker-kube/logging"

	"jianmu-worker-kube/engine/launcher"
	"jianmu-worker-kube/engine/podwatcher"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/remotecommand"
)

var errPodStopped = errors.New("pod has been stopped")

// Kubernetes Kubernetes引擎
type Kubernetes struct {
	client    kubernetes.Interface
	config    *rest.Config
	watchers  *sync.Map
	launchers *sync.Map

	containerStartTimeout time.Duration
}

// New 返回一个新的Kubernetes引擎
func New(client kubernetes.Interface, config *rest.Config, containerStartTimeout time.Duration) *Kubernetes {
	if containerStartTimeout < time.Second {
		containerStartTimeout = time.Second
	}

	return &Kubernetes{
		client:    client,
		config:    config,
		watchers:  &sync.Map{},
		launchers: &sync.Map{},

		containerStartTimeout: containerStartTimeout,
	}
}

// CreatePod 创建Pod
func (k *Kubernetes) CreatePod(ctx context.Context, unit *Unit) error {
	log := logger.FromContext(ctx).
		WithField("pod", unit.PodSpec.Name).
		WithField("namespace", unit.PodSpec.Namespace)

	if unit.PodSpec.Labels == nil {
		unit.PodSpec.Labels = make(map[string]string)
	}
	unit.PodSpec.Labels["dev.jianmu.name"] = unit.PodSpec.Name

	if unit.Namespace != "" {
		namespace := toNamespace(unit.PodSpec.Namespace, unit.PodSpec.Labels)
		_, err := k.client.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{})
		if err != nil {
			log.WithError(err).Error("failed to create namespace")
			return err
		}
		log.Trace("created namespace")
	}

	if unit.PullSecret != nil {
		pullSecret := toDockerConfigSecret(unit)
		_, err := k.client.CoreV1().Secrets(unit.PodSpec.Namespace).Create(ctx, pullSecret, metav1.CreateOptions{})
		if err != nil {
			log.WithError(err).Error("failed to create pull secret")
			return err
		}
		log.Trace("created pull secret")
	}

	secret := toSecret(unit)
	_, err := k.client.CoreV1().Secrets(unit.PodSpec.Namespace).Create(ctx, secret, metav1.CreateOptions{})
	if err != nil {
		log.WithError(err).Error("failed to create secret")
		return err
	}
	log.Trace("created secret")

	configMap := toConfigMap(unit)
	_, err = k.client.CoreV1().ConfigMaps(unit.PodSpec.Namespace).Create(ctx, configMap, metav1.CreateOptions{})
	if err != nil {
		log.WithError(err).Error("failed to create configMap")
		return err
	}
	log.Trace("created configMap")

	_, err = k.client.CoreV1().Pods(unit.PodSpec.Namespace).Create(ctx, toPod(unit), metav1.CreateOptions{})
	if err != nil {
		log.WithError(err).Error("failed to create pod")
		return err
	}
	log.Trace("created pod")

	unit.stop = make(chan struct{})

	return nil
}

// DeletePod 删除Pod
func (k *Kubernetes) DeletePod(ctx context.Context, unit *Unit) error {
	log := logger.FromContext(ctx).
		WithField("pod", unit.PodSpec.Name).
		WithField("namespace", unit.PodSpec.Namespace)
	if unit.PullSecret != nil {
		if err := k.client.CoreV1().Secrets(unit.PodSpec.Namespace).Delete(context.Background(), unit.PullSecret.Name, metav1.DeleteOptions{}); err != nil {
			log.WithError(err).Error("failed to delete pull secret")
		} else {
			log.Trace("deleted pull secret")
		}
	}

	if err := k.client.CoreV1().ConfigMaps(unit.PodSpec.Namespace).Delete(context.Background(), unit.PodSpec.Name, metav1.DeleteOptions{}); err != nil {
		log.WithError(err).Error("failed to delete configmaps")
	} else {
		log.Trace("deleted configmaps")
	}

	if err := k.client.CoreV1().Secrets(unit.PodSpec.Namespace).Delete(context.Background(), unit.PodSpec.Name, metav1.DeleteOptions{}); err != nil {
		log.WithError(err).Error("failed to delete secret")
	} else {
		log.Trace("deleted secret")
	}

	// close(unit.stop)

	var isPodDeleted bool

	if err := k.client.CoreV1().Pods(unit.PodSpec.Namespace).Delete(context.Background(), unit.PodSpec.Name, metav1.DeleteOptions{}); err != nil {
		log.WithError(err).Error("failed to delete pod")
	} else {
		log.Trace("deleted pod")
		isPodDeleted = true
	}

	if _l, loaded := k.launchers.LoadAndDelete(unit.PodSpec.Name); loaded {
		l := _l.(*launcher.Launcher)
		l.Stop()
	}

	if w, loaded := k.watchers.LoadAndDelete(unit.PodSpec.Name); loaded {
		if isPodDeleted {
			watcher := w.(*podwatcher.PodWatcher)
			if err := watcher.WaitPodDeleted(); err != nil && err != context.Canceled {
				log.WithError(err).Error("PodWatcher terminated with error")
			} else {
				log.Trace("PodWatcher terminated")
			}
		}
	}

	return nil
}

func (k *Kubernetes) FindPods(ctx context.Context, namespace string) ([]string, error) {
	log := logger.FromContext(ctx).
		WithField("namespace", namespace)
	opt := metav1.ListOptions{
		LabelSelector: "dev.jianmu.name",
	}
	list, err := k.client.CoreV1().Pods(namespace).List(ctx, opt)
	if err != nil {
		log.WithError(err).Error("无法查询Pods")
		return nil, err
	}
	var slice []string
	for _, pod := range list.Items {
		slice = append(slice, pod.Name)
	}
	return slice, nil
}

// Run 启动容器运行任务
func (k *Kubernetes) Run(ctx context.Context, unit *Unit, runner *Runner, output io.Writer) (state *State, err error) {

	podId := unit.PodSpec.Name
	podNamespace := unit.PodSpec.Namespace
	runnerName := runner.Name
	containerId := runner.ID
	containerImage := runner.Image
	containerPlaceholder := runner.Placeholder

	log := logger.FromContext(ctx).
		WithField("pod", podId).
		WithField("namespace", podNamespace).
		WithField("image", containerImage).
		WithField("placeholder", containerPlaceholder).
		WithField("container", containerId).
		WithField("runner", runnerName)

	w, loaded := k.watchers.LoadOrStore(podId, &podwatcher.PodWatcher{})
	watcher := w.(*podwatcher.PodWatcher)
	if !loaded {
		watcher.Start(context.Background(), &podwatcher.KubernetesWatcher{
			PodNamespace: podNamespace,
			PodName:      podId,
			KubeClient:   k.client,
			Period:       20 * time.Second,
		})

		log.Trace("PodWatcher started")
	}

	err = watcher.AddContainer(runner.ID, runner.Placeholder, runner.Image)
	if err != nil {
		return
	}

	log.Debug("Engine: Starting unit")

	err = <-k.startContainer(ctx, unit, runner)
	if err != nil {
		return
	}

	chErrStart := make(chan error)
	go func() {
		chErrStart <- watcher.WaitContainerStart(containerId)
	}()

	select {
	case err = <-chErrStart:
	case <-time.After(k.containerStartTimeout):
		err = podwatcher.StartTimeoutContainerError{Container: containerId, Image: containerImage}
		log.WithError(err).Error("Engine: Container start timeout")
	case <-unit.stop:
		return nil, errPodStopped
	}
	if err != nil {
		return
	}

	err = k.fetchLogs(ctx, unit, runner, output)
	if err != nil {
		return
	}

	var resultFile string
	if runner.ResultFile != "" {
		resultFile, err = k.getResultFile(ctx, unit, runner, runner.ResultFile)
		if err != nil {
			log.WithError(err).WithField("runner.ResultFile", runner.ResultFile).Debugln("无法获取结果文件")
		} else {
			log.Debugln("获取结果文件完成")
			log.Debugln(resultFile)
		}
	}

	type containerResult struct {
		code int
		err  error
	}

	chErrStop := make(chan containerResult)
	go func() {
		code, err := watcher.WaitContainerTerminated(containerId)
		chErrStop <- containerResult{code: code, err: err}
	}()

	select {
	case result := <-chErrStop:
		err = result.err
		if err != nil {
			return
		}

		state = &State{
			ExitCode:   result.code,
			Exited:     true,
			OOMKilled:  false,
			ResultFile: resultFile,
		}
	case <-unit.stop:
		return nil, errPodStopped
	}

	return
}

func (k *Kubernetes) getResultFile(ctx context.Context, unit *Unit, runner *Runner, filepath string) (string, error) {
	if filepath == "" {
		return "", nil
	}
	command := []string{"cat", filepath}
	opts := &v1.PodExecOptions{
		Container: "keep-alive",
		Command:   command,
		Stdin:     false,
		Stdout:    true,
		Stderr:    true,
		TTY:       false,
	}

	req := k.client.CoreV1().RESTClient().Get().
		Namespace(unit.PodSpec.Namespace).
		Name(unit.PodSpec.Name).
		Resource("pods").
		SubResource("exec").
		VersionedParams(opts, scheme.ParameterCodec)

	exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
	if err != nil {
		return "", err
	}

	var out bytes.Buffer
	err = exec.Stream(remotecommand.StreamOptions{
		Stdin:  nil,
		Stdout: &out,
		Stderr: io.Discard,
		Tty:    false,
	})
	if err != nil {
		return "", err
	}

	return out.String(), nil
}

func (k *Kubernetes) fetchLogs(ctx context.Context, unit *Unit, runner *Runner, output io.Writer) error {
	opts := &v1.PodLogOptions{
		Follow:    true,
		Container: runner.ID,
	}

	req := k.client.CoreV1().RESTClient().Get().
		Namespace(unit.PodSpec.Namespace).
		Name(unit.PodSpec.Name).
		Resource("pods").
		SubResource("log").
		VersionedParams(opts, scheme.ParameterCodec)

	readCloser, err := req.Stream(ctx)
	if err != nil {
		logger.FromContext(ctx).
			WithError(err).
			WithField("pod", unit.PodSpec.Name).
			WithField("namespace", unit.PodSpec.Namespace).
			WithField("container", runner.ID).
			WithField("runner", runner.Name).
			Error("failed to stream logs")
		return err
	}
	defer readCloser.Close()

	return cancellableCopy(ctx, output, readCloser)
}

func (k *Kubernetes) startContainer(ctx context.Context, unit *Unit, runner *Runner) <-chan error {
	podName := unit.PodSpec.Name
	podNamespace := unit.PodSpec.Namespace
	containerName := runner.ID
	containerImage := runner.Image

	_l, loaded := k.launchers.LoadOrStore(podName, launcher.New(podName, podNamespace, k.client, &unit.podUpdateMutex))
	l := _l.(*launcher.Launcher)
	if !loaded {
		l.Start(ctx)
	}

	return l.Launch(containerName, containerImage, runner.Envs)
}
